# Copyright 2025 DeepMind Technologies Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""Update block for consistent video depth optimization."""

import torch
from torch import nn
import torch.nn.functional as F


class FlowHead(nn.Module):

  def __init__(self, input_dim=128, hidden_dim=256):
    super(FlowHead, self).__init__()
    self.conv1 = nn.Conv2d(input_dim, hidden_dim, 3, padding=1)
    self.conv2 = nn.Conv2d(hidden_dim, 2, 3, padding=1)
    self.relu = nn.ReLU(inplace=True)

  def forward(self, x):
    return self.conv2(self.relu(self.conv1(x)))


class ConvGRU(nn.Module):
  """GRU with convolution."""

  def __init__(self, hidden_dim=128, input_dim=192 + 128):
    super(ConvGRU, self).__init__()
    self.convz = nn.Conv2d(hidden_dim + input_dim, hidden_dim, 3, padding=1)
    self.convr = nn.Conv2d(hidden_dim + input_dim, hidden_dim, 3, padding=1)
    self.convq = nn.Conv2d(hidden_dim + input_dim, hidden_dim, 3, padding=1)

  def forward(self, h, x):
    hx = torch.cat([h, x], dim=1)

    z = torch.sigmoid(self.convz(hx))
    r = torch.sigmoid(self.convr(hx))
    q = torch.tanh(self.convq(torch.cat([r * h, x], dim=1)))

    h = (1 - z) * h + z * q
    return h


class SepConvGRU(nn.Module):
  """GRU with separate convolution for horizontal and vertical directions."""

  def __init__(self, hidden_dim=128, input_dim=192 + 128):
    super(SepConvGRU, self).__init__()
    self.convz1 = nn.Conv2d(
        hidden_dim + input_dim, hidden_dim, (1, 5), padding=(0, 2)
    )
    self.convr1 = nn.Conv2d(
        hidden_dim + input_dim, hidden_dim, (1, 5), padding=(0, 2)
    )
    self.convq1 = nn.Conv2d(
        hidden_dim + input_dim, hidden_dim, (1, 5), padding=(0, 2)
    )

    self.convz2 = nn.Conv2d(
        hidden_dim + input_dim, hidden_dim, (5, 1), padding=(2, 0)
    )
    self.convr2 = nn.Conv2d(
        hidden_dim + input_dim, hidden_dim, (5, 1), padding=(2, 0)
    )
    self.convq2 = nn.Conv2d(
        hidden_dim + input_dim, hidden_dim, (5, 1), padding=(2, 0)
    )

  def forward(self, h, x):
    # horizontal
    hx = torch.cat([h, x], dim=1)
    z = torch.sigmoid(self.convz1(hx))
    r = torch.sigmoid(self.convr1(hx))
    q = torch.tanh(self.convq1(torch.cat([r * h, x], dim=1)))
    h = (1 - z) * h + z * q

    # vertical
    hx = torch.cat([h, x], dim=1)
    z = torch.sigmoid(self.convz2(hx))
    r = torch.sigmoid(self.convr2(hx))
    q = torch.tanh(self.convq2(torch.cat([r * h, x], dim=1)))
    h = (1 - z) * h + z * q

    return h


class SmallMotionEncoder(nn.Module):
  """Small motion encoder for MegaSaM."""

  def __init__(self, args):
    super(SmallMotionEncoder, self).__init__()
    cor_planes = args.corr_levels * (2 * args.corr_radius + 1) ** 2
    self.convc1 = nn.Conv2d(cor_planes, 96, 1, padding=0)
    self.convf1 = nn.Conv2d(2, 64, 7, padding=3)
    self.convf2 = nn.Conv2d(64, 32, 3, padding=1)
    self.conv = nn.Conv2d(128, 80, 3, padding=1)

  def forward(self, flow, corr):
    cor = F.relu(self.convc1(corr))
    flo = F.relu(self.convf1(flow))
    flo = F.relu(self.convf2(flo))
    cor_flo = torch.cat([cor, flo], dim=1)
    out = F.relu(self.conv(cor_flo))
    return torch.cat([out, flow], dim=1)


class BasicMotionEncoder(nn.Module):
  """Basic motion encoder for MegaSaM."""

  def __init__(self, args):
    super(BasicMotionEncoder, self).__init__()
    cor_planes = args.corr_levels * (2 * args.corr_radius + 1) ** 2
    self.convc1 = nn.Conv2d(cor_planes, 256, 1, padding=0)
    self.convc2 = nn.Conv2d(256, 192, 3, padding=1)
    self.convf1 = nn.Conv2d(2, 128, 7, padding=3)
    self.convf2 = nn.Conv2d(128, 64, 3, padding=1)
    self.conv = nn.Conv2d(64 + 192, 128 - 2, 3, padding=1)

  def forward(self, flow, corr):
    cor = F.relu(self.convc1(corr))
    cor = F.relu(self.convc2(cor))
    flo = F.relu(self.convf1(flow))
    flo = F.relu(self.convf2(flo))

    cor_flo = torch.cat([cor, flo], dim=1)
    out = F.relu(self.conv(cor_flo))
    return torch.cat([out, flow], dim=1)


class SmallUpdateBlock(nn.Module):
  """Small update block for MegaSaM."""

  def __init__(self, args, hidden_dim=96):
    super(SmallUpdateBlock, self).__init__()
    self.encoder = SmallMotionEncoder(args)
    self.gru = ConvGRU(hidden_dim=hidden_dim, input_dim=82 + 64)
    self.flow_head = FlowHead(hidden_dim, hidden_dim=128)

  def forward(self, net, inp, corr, flow):
    motion_features = self.encoder(flow, corr)
    inp = torch.cat([inp, motion_features], dim=1)
    net = self.gru(net, inp)
    delta_flow = self.flow_head(net)

    return net, None, delta_flow


class BasicUpdateBlock(nn.Module):
  """Basic update block for MegaSaM."""

  def __init__(self, args, hidden_dim=128, input_dim=128):
    super(BasicUpdateBlock, self).__init__()
    self.args = args
    self.encoder = BasicMotionEncoder(args)
    self.gru = SepConvGRU(hidden_dim=hidden_dim, input_dim=128 + hidden_dim)
    self.flow_head = FlowHead(hidden_dim, hidden_dim=256)

    self.mask = nn.Sequential(
        nn.Conv2d(128, 256, 3, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(256, 64 * 9, 1, padding=0),
    )

  def forward(self, net, inp, corr, flow, upsample=True):
    motion_features = self.encoder(flow, corr)
    inp = torch.cat([inp, motion_features], dim=1)

    net = self.gru(net, inp)
    delta_flow = self.flow_head(net)

    # scale mask to balence gradients
    mask = 0.25 * self.mask(net)
    return net, mask, delta_flow
